﻿using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
using System.Text.RegularExpressions;
using RabbitMQ.Client.Exceptions;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
using PServiceBus.Core.Runtime;
using PServiceBus.Core.Runtime.Extensions;
using PServiceBus.Core.Logger;
using PServiceBus.Core.Interface;
using System.Collections;
using System.Collections.Concurrent;

namespace PServiceBus.RabbitMQ {
    public sealed class Provider<TObject> : IHaveEndpoint, IObjectProvider<TObject> where TObject : class {
       
        private object _lockObject = new object();
        private static ConcurrentDictionary<string, IConnection> _connections = new ConcurrentDictionary<string, IConnection>();
        private static ConcurrentDictionary<string, IModel> _models = new ConcurrentDictionary<string, IModel>();
        private static ConcurrentDictionary<string, ProviderFactory> _factories = new ConcurrentDictionary<string, ProviderFactory>();
        private static ConcurrentDictionary<string, object> _providers = new ConcurrentDictionary<string, object>();


        public static Provider<TObject> GetProvider(string endpoint, bool appendTypeNameToEndpoint = false, string endpointAlias = null) {
            return _providers.GetOrAdd(endpoint, key => 
                new Provider<TObject> { Endpoint = endpoint, IDFunc = x => Guid.NewGuid(),
                    EndpointAlias = endpointAlias, AppendTypeNameToEndpoint = appendTypeNameToEndpoint }) as Provider<TObject>;
        }

        private void Query(Action<IModel, string, string> action) {
            lock (_lockObject) {
                var factory = ConnectionProviderFactory;
                CreateQueue(Model, factory.ExchangeName, factory.Queue);
                action(Model, factory.ExchangeName, factory.Queue);
            }
        }

        internal class ProviderFactory {
            public ConnectionFactory ConnectionFactory { get; set; }
            public string ExchangeName { get; set; }
            public string Queue { get; set; }
        }

        private IConnection GetConnection(string endpoint, bool appendTypeNameToEndpoint, string endpointAlias = null) {
            string key = String.Concat(endpoint, endpointAlias ?? string.Empty);
            var connection = _connections.GetOrAdd(key, k => {
                return GetConnectionFactory(Endpoint, AppendTypeNameToEndpoint, EndpointAlias).ConnectionFactory.CreateConnection();
            });
            if (connection == null || !connection.IsOpen)
                connection = _connections.AddOrUpdate(key, k => GetConnectionFactory(Endpoint, AppendTypeNameToEndpoint, EndpointAlias).ConnectionFactory.CreateConnection(),
                    (k, c) => GetConnectionFactory(Endpoint, AppendTypeNameToEndpoint, EndpointAlias).ConnectionFactory.CreateConnection());
            return connection;
        }

        private IModel GetModel(string endpoint, bool appendTypeNameToEndpoint, string endpointAlias = null) {
            string key = String.Concat(endpoint, endpointAlias ?? string.Empty);
            var model = _models.GetOrAdd(key, k => GetConnection(endpoint, appendTypeNameToEndpoint, endpointAlias).CreateModel());
            if(model == null || !model.IsOpen)
                model = _models.AddOrUpdate(key, k => GetConnection(endpoint, appendTypeNameToEndpoint, endpointAlias).CreateModel(),
                    (k, c) => GetConnection(endpoint, appendTypeNameToEndpoint, endpointAlias).CreateModel());
            return model;
        }

        private IConnection Connection {
            get { return GetConnection(Endpoint, AppendTypeNameToEndpoint, EndpointAlias); }
        }

        private IModel Model {
            get { return GetModel(Endpoint, AppendTypeNameToEndpoint, EndpointAlias); }
        }
        
        private ProviderFactory GetConnectionFactory(string endpoint, bool appendTypeNameToEndpoint, string endpointAlias = null) {
            var key = String.Concat(endpoint, endpointAlias ?? string.Empty);
            return _factories.GetOrAdd(key, k =>
            {
                var path = AppendTypeNameToEndpoint ? typeof(TObject).FullName.Replace(".", "_") : string.Empty;
                path += (EndpointAlias ?? string.Empty);
                endpoint += (endpoint.EndsWith("/") ? string.Empty : "/") + path;
                var uri = EndpointUri.Parse(endpoint);
                var queue = uri.Path;
                var exchangeName = "ex" + queue;
                var factory = new ConnectionFactory() { HostName = uri.Host, UserName = uri.Username, Password = uri.Password, Port = uri.Port };
                return new ProviderFactory() { ConnectionFactory = factory, ExchangeName = exchangeName, Queue = queue };
            });
        }

        private ProviderFactory ConnectionProviderFactory {
            get {
                return GetConnectionFactory(Endpoint, AppendTypeNameToEndpoint, EndpointAlias);
            }
        }


        public void Clear() {
            Query((ch, exchangeName, queue) => ch.QueuePurge(Endpoint));
        }

        
        private bool _queueCreated = false;
        private bool CreateQueue(IModel ch, string exchangeName, string queue) {
            var success = true;
            if (_queueCreated) return success;
            try {
                ch.ExchangeDeclare(exchangeName, ExchangeType.Fanout, true);
                ch.QueueDeclare(queue, true, false, false, null);
                ch.QueueBind(queue, exchangeName, queue, null);
                _queueCreated = true;
            } catch { success = false; }
            return success;
        }
        
        private TObject GetObjectByID(string id, bool deleteObjectOnRead) {
            var obj = default(TObject);
            try {
                lock (_lockObject) {
                    Query((ch, exchangeName, queue) =>
                    {
                        while (true) {
                            try {
                                var result = ch.BasicGet(queue, !deleteObjectOnRead);
                                if (result == null) break;
                                var body = result.Body.Deserialize<TObject>();
                                var prop = result.BasicProperties;
                                if (body != null && (id == null || (id != null && prop.MessageId == id))) {
                                    if (deleteObjectOnRead) ch.BasicAck(result.DeliveryTag, false);
                                    obj = body;
                                    break;
                                }
                            } catch (Exception) { break; }
                        }
                    });
                }
            }
            catch {
            }
            return obj;
        }

        #region IObjectProvider<TObject> Members

        public bool Refresh(TObject obj) {
            throw new NotImplementedException();
        }

        public bool Add(IEnumerable<TObject> list) {
            var success = true;
            try {
                Query((ch, exchangeName, queue) =>
                {
                    foreach (var data in list) {
                        var prop = ch.CreateBasicProperties();
                        prop.SetPersistent(true);
                        prop.DeliveryMode = 2;
                        prop.MessageId = IDFunc(data).ToString();
                        ch.BasicPublish(exchangeName, queue, prop, data.Serialize());
                    }
                });
            } catch {
                success = false;
            }
            return success;
        }

        public bool Add(TObject obj) {
            Query((ch, exchangeName, queue) =>
            {
                var prop = ch.CreateBasicProperties();
                prop.SetPersistent(true);
                prop.DeliveryMode = 2;
                prop.MessageId = IDFunc(obj).ToString();
                ch.BasicPublish(exchangeName, queue, prop, obj.Serialize());
            });
            return true;
        }

        public void Delete() {
            Query((ch, exchangeName, queue) =>
            {
                try {
                    ch.QueueUnbind(queue, exchangeName, queue, null);
                    ch.ExchangeDelete(exchangeName, true);
                    ch.QueueDelete(queue, true, false);
                } catch {
                    throw;
                }
            });
        }

        public bool Delete(IEnumerable<TObject> list) {
            var success = true;
            foreach (var obj in list) 
                success &= GetObjectByID(IDFunc(obj).ToString(), true) != null;
            return success;
        }

        public bool Delete(IEnumerable<Func<TObject, bool>> conditions) {
            var success = true;
            foreach (var condition in conditions) {
                var deleteObj = this.FirstOrDefault(obj => obj != null && condition(obj));
                success &= deleteObj != null
                    && GetObjectByID(IDFunc(deleteObj).ToString(), true) != null;
            }
            return success;
        }

        public bool Delete(Func<TObject, bool> condition) {
            var deleteObj = this.FirstOrDefault(obj => obj != null && condition(obj));
            return deleteObj != null
                && GetObjectByID(IDFunc(deleteObj).ToString(), true) != null;           
        }

        public bool Exists(Func<TObject, bool> condition) {
            return Get(condition) != null;
        }

        public TObject Get(Func<TObject, bool> condition) {
            return this.FirstOrDefault(obj => condition(obj));
        }

        public bool Exists(TObject obj) {
            return GetObjectByID(IDFunc(obj).ToString(), false) != null;
        }

        public TObject Get(Guid id) {
            return GetObjectByID(id.ToString(), false);
        }

        public bool Delete(TObject obj) {
            return GetObjectByID(IDFunc(obj).ToString(), true) != null;
        }

        public string Endpoint { get; set; }

        public string EndpointAlias { get; set; }

        public bool AppendTypeNameToEndpoint { get; set; }

        public bool DeleteObjectOnRead { get; set; }

        public Func<TObject, Guid> IDFunc { get; set; }
        #endregion

        #region IEnumerable<TObject> Members

        public IEnumerator<TObject> GetEnumerator() {
            var factory = ConnectionProviderFactory;
            CreateQueue(Model, factory.ExchangeName, factory.Queue);
            var body = default(TObject);
            
            while (true) {
                try {
                    var result = Model.BasicGet(factory.Queue, !DeleteObjectOnRead);
                    if (result != null)
                        MethodHelper.Try(() => body = result.Body.Deserialize<TObject>());
                    else break;
                    if (DeleteObjectOnRead) Model.BasicAck(result.DeliveryTag, false);
                } catch (Exception) { break; }
                if (body != null) yield return body;
            }
        }

        #endregion

        #region IEnumerable Members

        IEnumerator IEnumerable.GetEnumerator() {
            return GetEnumerator();
        }

        #endregion

        #region IDisposable Members

        public void Dispose() {
            MethodHelper.Try(() => {
                foreach (var kv in _models) kv.Value.Close();
            });
            MethodHelper.Try(() => {
                foreach (var kv in _connections) kv.Value.Close();
            });
            _models.Clear();
            _connections.Clear();
            _factories.Clear();
        }

        #endregion
    }
}
